Reactive Extensions (Rx)
Reactive Extensions(Rx) は、2009年頃、クラウド・プログラマビリティ・チームにおいて、"Volta" プロジェクトの階層分割の要件をサポートするために、次のような観点から考案されました。クライアントとサービスの境界を越えたアプリケーションの分割をサポートするためには非同期の通信が必要で、アプリケーションのクライアント層をJavaScriptにリターゲットする場合、クライアントとサービスの間のすべての通信は、AJAXを使って非同期に行われなければなりません。"Volta" プロジェクトがスタートした当時、C#やVisual Basicなどの言語には、asyncやawaitはもちろんのこと、Task<T>という形で未来を抽象化したものさえありませんでした。そのため、クライアント側のコードをCPS(Continuation Passing Style)に書き換える必要がありました。クラウドの境界を越えたイベントハンドラーだけでなく、単一値の非同期計算もサポートしなければなりませんでした。
Reactive Extensions (Rx) was conceived around 2009 in the Cloud Programmability Team to support the tier-splitting requirement of the “Volta” project, from the following observation. In order to support splitting of an application across the client/service boundary, asynchronous communication is required. When retargeting the client tier of the application to JavaScript, all communication between client and service has to take place asynchronously using AJAX. When the “Volta” project was started, languages such as C# and Visual Basic lacked async and await and even the fundamental abstraction of a future in the form of Task<T>. This required the rewrite of client-side code to continuation passing style (CPS). We had to support both single-value asychronous computation, as well as event handlers across cloud boundaries:
code:C#
class Dashboard : Form
{
public void Form_Load(object sender, EventArgs e)
{
int res = server.Add(1, 2);
lblSum.Text = res.ToString();
server.StockTicks += tick =>
this.Invoke(() => lstTicks.Add(tick.ToString()));
}
}
class Service
{
public int Add(int a, int b) => a + b;
public event Action<StockTick> StockTicks;
}
もしこの技術を今日実装するとしたら、Addの呼び出しを非同期に変換してTask<int>を返すことに疑問の余地はないでしょう。なぜなら、.NETイベントはファーストクラスのオブジェクトではなく、単なるメタデータの構成要素だからです。委譲型を使ってオブジェクトに変換できるメソッドとは異なり、イベントはオブジェクト表現を持ちません。このため、単一の値や複数の値にかかわらず、境界を越えた非同期処理の「接着剤」として機能するフレームワークを導入することになりました。
If we were to implement this technology today, there wouldn’t be any doubt to translate the Add call to an asynchronous one, returning a Task<int>. However, for the event handler we still wouldn’t have a good language-integrated solution, because .NET events are merely metadata constructs rather than first-class objects. Unlike a method which can be converted to an object using a delegate type, an event does not have an object representation. This led to the introduction of a framework to act as the “glue” for cross-boundary asynchrony, regardless of whether it involves a single value or multiple values.
興味深いのは、Erik Meijer氏のMSRでの元博士課程の学生の一人であるDaan Leijen氏が、.NET用のフューチャーズライブラリを開発していたことです。このライブラリは、Joe Duffy氏、Stephen Toub氏らによってParallel Extensions for .NETとして製品化されました。Reactive Extensionsは、Parallel Extensionsプロジェクトにならい「Extensions」という接尾辞を採用していますが、これは同時期に作られたことによるものです。
An interesting tidbit is that one of Erik Meijer’s former doctorate students at MSR - Daan Leijen - worked on a futures library for .NET, which got productized in the Parallel Extensions for .NET by Joe Duffy, Stephen Toub, et al. Today, these libraries are part of the .NET Framework and include types such as Task<T>. Reactive Extensions borrows the “Extensions” suffix from the Parallel Extensions project because they were created around the same time.
この接着剤は、IEvent<T>のようなインターフェースで何度も失敗した後、最終的にRxとなり、最終的には、.NET Frameworkに追加されたIObservable<T>とIObserver<T>の抽象化にたどり着きました。
This glue ultimately became Rx, after a number of false starts with interfaces such as IEvent<T>. Finally, we arrived at the IObservable<T> and IObserver<T> abstractions which got added to the .NET Framework eventually:
code:C#
namespace System
{
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
}
これらのインターフェースの詳細な説明については,reactivex.ioとChannel 9の豊富なビデオを参照してください。本質的には、observable sequenceはイベントのデータソースを表し、observerはそのようなイベントのレシーバーを表します。Subscribeメソッドは、observerをobservable sequenceにアタッチして、イベントの受信を開始します。このメソッドが返すIDisposableオブジェクトは、イベントの受信が不要になったときにサブスクリプションを破棄するために使用できます。観測可能なシーケンスとそのサブスクリプションハンドルは、それぞれIObservable<T>とIDisposable上の代数学を使用して構成することができるので、このアプローチは、イベントハンドラの追加と削除に比べてはるかに構成的です。
For a detailed explanation of these interfaces, see reactivex.io and the rich set of videos on Channel 9. In essence, an observable sequence represents a data source for events, while an observer represents the receiver of such events. The Subscribe method attaches an observer to an observable sequence in order for it to start receiving events. The IDisposable object returned by this method can be used to dispose the subscription when receiving events is no longer desired. This approach is much more compositional compared to adding and removing event handlers, because both observable sequences and their subscription handles can be composed using algebras on IObservable<T> and IDisposable, respectively.
Rxライブラリは、IObservable<T>にLINQスタイルのクエリ演算子を提供する拡張メソッドのセットを提供します。フィルタリング,投影,集約などの標準的なクエリ演算子に加えて,Rxの演算子には時間ベースの演算子も含まれています.例えば、1時間の平均株価を計算する例を以下に示します。
The Rx library provides a set of extension methods for IObservable<T> that provide LINQ-style query operators. Besides standard query operators for filtering, projection, aggregation, etc. the set of Rx operators also includes time-based ones, for example to sample a sequence of events, to throttle the flow of events, to create time-based windows of events, etc. An example to compute one-hour average stock prices is shown below:
code:C#
IObservable<decimal> GetAverageStockValuePerHour(IObservable<StockTick> stocks, string symbol)
{
return from oneHour in (from stock in stocks
where stock.Symbol == symbol
select stock.Price)
.Window(TimeSpan.FromHours(1))
from average in oneHour.Average()
select average;
}
IObservable<T>およびIObserver<T>インターフェースがIEnumerable<T>およびIEnumerator<T>インターフェースの数学的双対であることに気付いたMSRのGavin Bierman氏と共同で、Erik Meijer氏がインターフェース、そのセマンティクス、およびクエリ演算子のRx代数の形式化を推進しました。前者のインターフェースはプルベースのシーケンス(ブロック化されたforeachループを使用して消費される)を扱い、後者のインターフェースはプッシュベースのシーケンス(Subscribeによって提供される非同期コールバックを使用して消費される)を扱います。
Formalization of the interfaces, their semantics, and the Rx algebra of query operators was driven by Erik Meijer in collaboration with Gavin Bierman at MSR who realized that the IObservable<T> and IObserver<T> interfaces are the mathematical dual of the IEnumerable<T> and IEnumerator<T> interfaces. The former family of interfaces deals with pull-based sequences (consumed using blocking foreach loops), while the latter family of interfaces deals with push-based sequences (consuming using asynchronous callbacks supplied by Subscribe).